iT邦幫忙

2025 iThome 鐵人賽

DAY 13
0
AI & Data

從0開始的MLFLOW應用搭建系列 第 13

Day 13 – 訓練 Pipeline 結構最佳化

  • 分享至 

  • xImage
  •  

背景與目標

到目前為止,我們的 Notebook 裡有很多零散的程式碼:

  • 載入資料
  • 訓練模型(Popular Top-10、TF-IDF)
  • 記錄到 MLflow

缺點是:

  • 每次換模型就要重新複製貼上。
  • 程式碼越來越難維護。

👉 今天我們要做的事:

  1. 建立一個乾淨的 Pipeline 模組,包含:

    • load_data() → 載入資料
    • train_model() → 訓練模型
    • evaluate_and_log() → 評估並記錄到 MLflow
  2. 提供一個 run_pipeline.py,一鍵就能跑完整流程。


檔案結構

請在專案裡新增以下檔案與資料夾:

/usr/mlflow/src/pipeline/
    └── pipeline.py         # 定義 Pipeline 類別
/usr/mlflow/run_pipeline.py # 主程式,呼叫 pipeline 執行

pipeline.py

這是我們的核心模組。

📂 路徑:/usr/mlflow/src/pipeline/pipeline.py

import os
import pandas as pd
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import mlflow

DATA_DIR = "/usr/mlflow/data"

class AnimePipeline:
    def __init__(self, sample_size=1000):
        self.sample_size = sample_size

    def load_data(self):
        """載入動畫資料,只取部分樣本確保 3 分鐘內可跑完"""
        anime = pd.read_csv(os.path.join(DATA_DIR, "anime_clean.csv"))
        ratings_train = pd.read_csv(os.path.join(DATA_DIR, "ratings_train.csv"))

        # 取樣,避免全量跑太久
        anime = anime.sample(self.sample_size, random_state=42).reset_index(drop=True)
        return anime, ratings_train

    def train_model(self, anime, max_features=1000, ngram_range=(1,1), min_df=2):
        """用 TF-IDF 訓練 item-based 模型"""
        vectorizer = TfidfVectorizer(
            stop_words="english",
            max_features=max_features,
            ngram_range=ngram_range,
            min_df=min_df
        )
        tfidf = vectorizer.fit_transform(anime["genre"].fillna(""))
        sim_matrix = cosine_similarity(tfidf)
        return sim_matrix

    def evaluate_and_log(self, anime, sim_matrix, params):
        """簡單評估 Precision@10 並 log 到 MLflow"""
        def precision_at_k(recommended, relevant, k=10):
            return len(set(recommended[:k]) & set(relevant)) / k

        # 隨機測 30 部動畫,控制時間
        test_idx = np.random.choice(len(anime), 30, replace=False)
        scores = []
        for idx in test_idx:
            sim_scores = list(enumerate(sim_matrix[idx]))
            sim_scores = sorted(sim_scores, key=lambda x: x[1], reverse=True)
            top_idx = [i for i, _ in sim_scores[1:11]]
            recommended = anime.iloc[top_idx]["name"].tolist()
            relevant = anime[anime["genre"] == anime.iloc[idx]["genre"]]["name"].tolist()
            if len(relevant) > 1:
                scores.append(precision_at_k(recommended, relevant, k=10))

        avg_precision = np.mean(scores)

        with mlflow.start_run(run_name="pipeline-tfidf") as run:
            mlflow.log_params(params)          # 紀錄參數
            mlflow.log_metric("precision_at_10", avg_precision)  # 紀錄指標
            print("Run ID:", run.info.run_id)
            print("Artifact URI:", run.info.artifact_uri)

        return avg_precision

run_pipeline.py

這是 pipeline 的執行入口。

📂 路徑:/usr/mlflow/day13_run_pipeline.py

import mlflow
from src.pipeline.pipeline import AnimePipeline


mlflow.set_tracking_uri("http://mlflow:5000")
mlflow.set_experiment("anime-recsys-pipeline")

def main():
    pipeline = AnimePipeline(sample_size=1000)  # 控制資料量
    anime, ratings_train = pipeline.load_data()

    params = {
        "max_features": 1000,
        "ngram_range": (1,1),
        "min_df": 2
    }

    sim_matrix = pipeline.train_model(anime, **params)
    score = pipeline.evaluate_and_log(anime, sim_matrix, params)

    print(f"Pipeline 完成,Precision@10 = {score:.4f}")

if __name__ == "__main__":
    main()

執行方法

python-dev 容器內執行:

python day13_run_pipeline.py

https://ithelp.ithome.com.tw/upload/images/20250923/20178626BdeSfu96rQ.png

執行後,你會看到:

  • console 印出 run id、artifact uri。
  • MLflow UI 裡有新實驗 anime-recsys-pipeline,裡面有 params 和 metrics。
    https://ithelp.ithome.com.tw/upload/images/20250923/20178626ShU8ry9eKc.png

https://ithelp.ithome.com.tw/upload/images/20250923/201786266CTQNBRCR9.png


流程圖

run_pipeline.py
    │
    ▼
AnimePipeline
  ├── load_data() → 抽樣資料
  ├── train_model() → TF-IDF 訓練
  └── evaluate_and_log() → Precision@10 → MLflow

重點總結

  • 今天我們把零散的程式封裝成 Pipeline,結構更加清晰。
  • 新增了 run_pipeline.py,可以一鍵跑完流程。
  • 整個流程限制在 1000 筆樣本 + 30 筆測試,確保 3 分鐘內完成
  • 雖然目前pipline只使用到兩個檔案,但可以以此為基礎,後續再更多模組化元件的支撐下,pipline幫助我們更流暢的執行模型的流程

上一篇
Day 12 – 訓練參數自動化(Optuna + MLflow)
下一篇
Day 14 – 加入額外使用者行為特徵
系列文
從0開始的MLFLOW應用搭建23
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言